From 4843af21aa74dfc8bc07d502a06e8f167c769816 Mon Sep 17 00:00:00 2001 From: qutl Date: Mon, 26 Mar 2018 17:42:07 +0800 Subject: [PATCH 1/2] z --- .../rxjava/share/practices/Practice1.java | 5 +++- .../rxjava/share/practices/Practice2.java | 24 +++++++++++++---- .../rxjava/share/practices/Practice3.java | 17 +++++++++--- .../rxjava/share/practices/Practice4.java | 3 ++- .../rxjava/share/practices/Practice5.java | 26 +++++++++---------- 5 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..3921835 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -24,12 +24,15 @@ */ public class Practice1 { + // + protected int count = 1; + /* * 举例如下: * 参数 Observable["a","b","c"] * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(s -> new Tuple2(count++, s)); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..550e058 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,11 +17,14 @@ package cn.nextop.rxjava.share.practices; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; + import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; - -import java.util.Map; +import io.reactivex.functions.BiFunction; /** * @author Baoyi Chen @@ -34,7 +37,9 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(key -> key).map(group -> { + return group.count().map(c -> new Tuple2(group.getKey(), c.intValue())).toObservable(); + }).flatMap(f -> f); } /* @@ -43,7 +48,16 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.reduceWith(new Callable>() { + @Override + public Map call() throws Exception { + return new HashMap(); + } + }, new BiFunction, String, Map>() { + @Override + public Map apply(Map t1, String t2) throws Exception { + Integer c = t1.get(t2); if(c == null) c = 0; t1.put(t2, c++); return t1; + } + }); } - } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..8b261cf 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -16,6 +16,9 @@ package cn.nextop.rxjava.share.practices; +import java.util.ArrayList; +import java.util.List; + import io.reactivex.Maybe; import io.reactivex.Observable; @@ -28,7 +31,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((a,b) -> a + b); } /* @@ -41,10 +44,16 @@ public Maybe sum(Observable observable) { * * return Observable[4, 3, 6, 7, 5] 顺序无关 */ - public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } + public Observable iterate(Observable observable) { + return observable.flatMapIterable(n -> { + List l = new ArrayList<>(); + fill(l, n); return l; }).map(n -> n.value); + } + private void fill(List r, Node n) { + if (n == null) return; r.add(n); fill(r, n.left); fill(r, n.right); + } + public static class Node { public Node left; public Node right; diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..c5df3d2 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(s -> Observable.just(s).observeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..0c35b43 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,14 +16,14 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + /** * @author Baoyi Chen */ @@ -35,7 +35,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.count(); } /* @@ -44,7 +44,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMapIterable(s -> s); } /* @@ -53,7 +53,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.distinct(); } /* @@ -62,7 +62,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.filter(conditon::test); } /* @@ -71,7 +71,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.elementAt(index); } /* @@ -80,7 +80,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return source.repeat(count); } /* @@ -89,7 +89,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.concat(source); } /* @@ -98,7 +98,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.merge(source); } /* @@ -107,7 +107,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(s -> Observable.just(s).delay(delay, unit)); } } From fb0aa5b8d523cec3a014200ce05a7376bbdb1517 Mon Sep 17 00:00:00 2001 From: qutl Date: Mon, 26 Mar 2018 18:30:03 +0800 Subject: [PATCH 2/2] z --- .../cn/nextop/rxjava/share/practices/Practice1.java | 4 ++-- .../cn/nextop/rxjava/share/practices/Practice5.java | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index 3921835..f99768d 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -16,6 +16,7 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; @@ -25,7 +26,6 @@ public class Practice1 { // - protected int count = 1; /* * 举例如下: @@ -33,6 +33,6 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - return observable.map(s -> new Tuple2(count++, s)); + return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (i, s) -> Tuples.of(s, i)); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 0c35b43..a786458 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -35,7 +35,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - return source.count(); + return source.reduce(Long.valueOf("0"), (a, b) -> a + 1); } /* @@ -53,7 +53,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - return source.distinct(); + return source.groupBy(s -> s).map(e -> e.getKey()); } /* @@ -71,7 +71,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - return source.elementAt(index); + return source.skip(index).take(1).firstElement(); } /* @@ -80,7 +80,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - return source.repeat(count); + return Observable.range(0, count).concatMap(e -> source); } /* @@ -89,7 +89,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - return Observable.concat(source); + return Observable.fromIterable(source).concatMap(s -> s); } /* @@ -98,7 +98,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - return Observable.merge(source); + return Observable.fromIterable(source).flatMap(s -> s); } /*