diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..376cb42 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -19,6 +19,10 @@ import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; +import static cn.nextop.rxjava.share.util.Tuples.of; +import static io.reactivex.Observable.range; +import static java.lang.Integer.MAX_VALUE; + /** * @author Baoyi Chen */ @@ -29,7 +33,7 @@ public class Practice1 { * 参数 Observable["a","b","c"] * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ - public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + public Observable> indexable(Observable ob) { + return ob.zipWith(range(1, MAX_VALUE), (i, s) -> of(s, i)); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..363c0b4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -21,8 +21,11 @@ import io.reactivex.Observable; import io.reactivex.Single; +import java.util.HashMap; import java.util.Map; +import static cn.nextop.rxjava.share.util.Tuples.of; + /** * @author Baoyi Chen */ @@ -34,7 +37,7 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(e -> e).flatMap(e -> e.count().map(x -> of(e.getKey(), x.intValue())).toObservable()); } /* @@ -43,7 +46,9 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.reduceWith(() -> new HashMap(), (a, b) -> { + if (a.containsKey(b)) a.put(b, a.get(b) + 1); else a.put(b, 1); return a; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..12d90e1 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -19,6 +19,10 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import static io.reactivex.Observable.empty; +import static io.reactivex.Observable.just; +import static io.reactivex.Observable.merge; + /** * @author Baoyi Chen */ @@ -28,7 +32,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return iterate(observable).reduce((x, y) -> x + y); } /* @@ -42,7 +46,11 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMap(n -> { + Observable left = n.left == null ? empty() : iterate(just(n.left)); + Observable right = n.right == null ? empty() : iterate(just(n.right)); + return merge(left, right, just(n.value)); + }); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..b42f059 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -19,6 +19,8 @@ import io.reactivex.Observable; +import static io.reactivex.Observable.just; +import static io.reactivex.schedulers.Schedulers.newThread; /** * @author Baoyi Chen @@ -43,8 +45,8 @@ public class Practice4 { * --------------- * */ - public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + public Observable runInMultiThread(Observable ob) { + return ob.flatMap(e -> just(e).subscribeOn(newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..40e8cb2 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -18,12 +18,17 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; import io.reactivex.Single; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import static cn.nextop.rxjava.share.util.Tuples.of; +import static io.reactivex.Observable.*; +import static java.lang.Integer.MAX_VALUE; + /** * @author Baoyi Chen */ @@ -35,7 +40,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce(0L, (a, b) -> a + 1); } /* @@ -44,7 +49,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(e -> fromIterable(e)); } /* @@ -53,7 +58,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(e -> e).map(e -> e.getKey()); } /* @@ -61,8 +66,8 @@ public Observable distinct(Observable source) { * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 * return: Observable[3, 4] */ - public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + public Observable filter(Observable source, Predicate condition) { + return source.concatMap(e -> condition.test(e) ? just(e) : empty()); } /* @@ -71,7 +76,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.zipWith(range(0, MAX_VALUE), (a, b) -> of(a, b)).filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); } /* @@ -80,7 +85,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return range(0, count).concatMap(e -> source); } /* @@ -89,7 +94,17 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return create(e -> concat(source, e)); + } + + private void concat(List> source, ObservableEmitter e) { + if (source.isEmpty()) { + e.onComplete(); + } else { + source.get(0).subscribe(e::onNext, e::onError, () -> { + concat(source.subList(1, source.size()), e); + }); + } } /* @@ -98,7 +113,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return fromIterable(source).flatMap(e -> e); } /* @@ -107,7 +122,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(e -> just(e).delay(delay, unit)); } }