From a0b79e8988b5c8341f96e80b7c7bd667b14bab7d Mon Sep 17 00:00:00 2001 From: sunjz Date: Mon, 26 Mar 2018 16:33:05 +0800 Subject: [PATCH 1/3] RxJava practice --- .../rxjava/share/practices/Practice1.java | 5 ++++- .../rxjava/share/practices/Practice2.java | 20 +++++++++++++++++-- .../rxjava/share/practices/Practice3.java | 15 ++++++++++++-- .../rxjava/share/practices/Practice4.java | 5 ++++- .../rxjava/share/practices/Practice5.java | 18 ++++++++--------- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..7d1afa7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -19,6 +19,9 @@ import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; +import static cn.nextop.rxjava.share.util.Tuples.of; + + /** * @author Baoyi Chen */ @@ -30,6 +33,6 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(i -> of(1, i)).scan((v1, v2) -> of(v1.getV1() + 1, v2.getV2())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..38a28a4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,12 +17,16 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Maps; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; +import java.util.HashMap; import java.util.Map; +import static cn.nextop.rxjava.share.util.Tuples.of; + /** * @author Baoyi Chen */ @@ -34,7 +38,8 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + // TODO: confused ? + return words.groupBy(w -> w).map(g -> g.count().map(c -> of(g.getKey(), c.intValue())).toObservable()).flatMap(e -> e); } /* @@ -43,7 +48,18 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + // TODO: confused ? + return words.reduce(new HashMap<>(), this::reduce); + } + + protected Map reduce(Map map, String key) { + if (Maps.isEmpty(map)) { + map = new HashMap<>(); + } + + Integer i = map.get(key); + map.put(key, i == null ? 0 : i++); + return map; } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..00451f7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -19,6 +19,9 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import java.util.ArrayList; +import java.util.List; + /** * @author Baoyi Chen */ @@ -28,7 +31,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((x, y) -> x + y); } /* @@ -42,7 +45,10 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + // TODO: 直接定义一个mapper function 应该更decent + return observable.flatMapIterable(n -> { + List r = new ArrayList<>(8); this.flat(r, n); return r; + }).map(i -> i.value); } public static class Node { @@ -57,4 +63,9 @@ public Node(Node left, Node right, int value) { } } + protected void flat(List list, Node node) { + if (node == null) return; + list.add(node); this.flat(list, node.left); this.flat(list, node.right); + } + } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..1b03fe4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -19,6 +19,9 @@ import io.reactivex.Observable; +import static io.reactivex.Observable.just; +import static io.reactivex.schedulers.Schedulers.io; + /** * @author Baoyi Chen @@ -44,7 +47,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(o -> just(o).observeOn(io())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..12ea9d0 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -35,7 +35,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.count(); } /* @@ -44,7 +44,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMapIterable(s -> s); } /* @@ -53,7 +53,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.distinct(); } /* @@ -62,7 +62,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.filter(s -> conditon.test(s)); } /* @@ -71,7 +71,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.elementAt(index); } /* @@ -80,7 +80,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return source.repeat(count); } /* @@ -89,7 +89,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.concat(source); } /* @@ -98,7 +98,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.merge(source); } /* @@ -107,7 +107,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.map(s -> { unit.sleep(delay); return s; }); } } From 2807bdc7884c702a96460976ef9006961f6f1c38 Mon Sep 17 00:00:00 2001 From: sunjz Date: Mon, 26 Mar 2018 17:07:59 +0800 Subject: [PATCH 2/3] bugfix --- .../rxjava/share/practices/Practice2.java | 14 +------------- .../rxjava/share/practices/Practice5.java | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 38a28a4..a3e26c7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,7 +17,6 @@ package cn.nextop.rxjava.share.practices; -import cn.nextop.rxjava.share.util.Maps; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; @@ -48,18 +47,7 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - // TODO: confused ? - return words.reduce(new HashMap<>(), this::reduce); - } - - protected Map reduce(Map map, String key) { - if (Maps.isEmpty(map)) { - map = new HashMap<>(); - } - - Integer i = map.get(key); - map.put(key, i == null ? 0 : i++); - return map; + return words.reduce(new HashMap<>(), (m, k) -> { m.put(k, m.containsKey(k) ? m.get(k) + 1 : 1); return m; }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 12ea9d0..d07538d 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import static io.reactivex.Observable.empty; +import static io.reactivex.Observable.just; + /** * @author Baoyi Chen */ @@ -35,7 +38,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - return source.count(); + return source.reduce(0L, (i, s) -> i + 1); } /* @@ -44,7 +47,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - return source.flatMapIterable(s -> s); + return source.flatMap(s -> Observable.fromIterable(s)); } /* @@ -53,7 +56,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - return source.distinct(); + return source.groupBy(s -> s).map(s -> s.getKey()); } /* @@ -62,7 +65,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - return source.filter(s -> conditon.test(s)); + return source.flatMap(s -> conditon.test(s) ? just(s) : empty()); } /* @@ -71,7 +74,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - return source.elementAt(index); + return source.skip(index).take(1).firstElement(); } /* @@ -80,7 +83,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - return source.repeat(count); + return Observable.range(0, count).flatMap(s -> source); } /* @@ -89,7 +92,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - return Observable.concat(source); + return Observable.fromIterable(source).concatMap(s -> s); } /* @@ -98,7 +101,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - return Observable.merge(source); + return Observable.fromIterable(source).flatMap(s -> s); } /* From 2ed86714f39ea8ee89a9bd2fb5ff28f7121049d8 Mon Sep 17 00:00:00 2001 From: sunjz Date: Tue, 27 Mar 2018 14:54:05 +0800 Subject: [PATCH 3/3] bugfix --- src/main/java/cn/nextop/rxjava/share/practices/Practice2.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index a3e26c7..ca75190 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -38,7 +38,7 @@ public class Practice2 { */ public Observable> wordCount1(Observable words) { // TODO: confused ? - return words.groupBy(w -> w).map(g -> g.count().map(c -> of(g.getKey(), c.intValue())).toObservable()).flatMap(e -> e); + return words.groupBy(w -> w).flatMap(g -> g.count().map(c -> of(g.getKey(), c.intValue())).toObservable()); } /* @@ -47,7 +47,7 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - return words.reduce(new HashMap<>(), (m, k) -> { m.put(k, m.containsKey(k) ? m.get(k) + 1 : 1); return m; }); + return words.reduceWith(() -> new HashMap<>(), (m, k) -> { m.put(k, m.containsKey(k) ? m.get(k) + 1 : 1); return m; }); } }