diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..7d1afa7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -19,6 +19,9 @@ import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; +import static cn.nextop.rxjava.share.util.Tuples.of; + + /** * @author Baoyi Chen */ @@ -30,6 +33,6 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(i -> of(1, i)).scan((v1, v2) -> of(v1.getV1() + 1, v2.getV2())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..ca75190 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -21,8 +21,11 @@ import io.reactivex.Observable; import io.reactivex.Single; +import java.util.HashMap; import java.util.Map; +import static cn.nextop.rxjava.share.util.Tuples.of; + /** * @author Baoyi Chen */ @@ -34,7 +37,8 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + // TODO: confused ? + return words.groupBy(w -> w).flatMap(g -> g.count().map(c -> of(g.getKey(), c.intValue())).toObservable()); } /* @@ -43,7 +47,7 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.reduceWith(() -> new HashMap<>(), (m, k) -> { m.put(k, m.containsKey(k) ? m.get(k) + 1 : 1); return m; }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..00451f7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -19,6 +19,9 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import java.util.ArrayList; +import java.util.List; + /** * @author Baoyi Chen */ @@ -28,7 +31,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((x, y) -> x + y); } /* @@ -42,7 +45,10 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + // TODO: 直接定义一个mapper function 应该更decent + return observable.flatMapIterable(n -> { + List r = new ArrayList<>(8); this.flat(r, n); return r; + }).map(i -> i.value); } public static class Node { @@ -57,4 +63,9 @@ public Node(Node left, Node right, int value) { } } + protected void flat(List list, Node node) { + if (node == null) return; + list.add(node); this.flat(list, node.left); this.flat(list, node.right); + } + } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..1b03fe4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -19,6 +19,9 @@ import io.reactivex.Observable; +import static io.reactivex.Observable.just; +import static io.reactivex.schedulers.Schedulers.io; + /** * @author Baoyi Chen @@ -44,7 +47,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(o -> just(o).observeOn(io())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..d07538d 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import static io.reactivex.Observable.empty; +import static io.reactivex.Observable.just; + /** * @author Baoyi Chen */ @@ -35,7 +38,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce(0L, (i, s) -> i + 1); } /* @@ -44,7 +47,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(s -> Observable.fromIterable(s)); } /* @@ -53,7 +56,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(s -> s).map(s -> s.getKey()); } /* @@ -62,7 +65,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(s -> conditon.test(s) ? just(s) : empty()); } /* @@ -71,7 +74,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.skip(index).take(1).firstElement(); } /* @@ -80,7 +83,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(0, count).flatMap(s -> source); } /* @@ -89,7 +92,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(s -> s); } /* @@ -98,7 +101,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(s -> s); } /* @@ -107,7 +110,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.map(s -> { unit.sleep(delay); return s; }); } }