From 68ec7cbdda52dcea2a575e72ffca26e121eed0bf Mon Sep 17 00:00:00 2001 From: yunfei Date: Mon, 26 Mar 2018 16:28:20 +0800 Subject: [PATCH] test --- .../rxjava/share/practices/Practice1.java | 3 +- .../rxjava/share/practices/Practice2.java | 12 +++-- .../rxjava/share/practices/Practice3.java | 8 +++- .../rxjava/share/practices/Practice4.java | 3 +- .../rxjava/share/practices/Practice5.java | 48 +++++++++++++------ 5 files changed, 52 insertions(+), 22 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..de9809a 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -16,6 +16,7 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; @@ -30,6 +31,6 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (v, n) -> Tuples.of(n, v)); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..0f71cea 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,12 +17,14 @@ package cn.nextop.rxjava.share.practices; +import java.util.HashMap; +import java.util.Map; + +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; -import java.util.Map; - /** * @author Baoyi Chen */ @@ -34,7 +36,7 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(e -> e).flatMap(e -> e.count().toObservable().map(x -> Tuples.of(e.getKey(), x.intValue()))); } /* @@ -43,7 +45,9 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return wordCount1(words).reduce(new HashMap(), (a, b) -> {a.put(b.getV1(), b.getV2()); + return a; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..2204cd4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -28,7 +28,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return iterate(observable).reduce((x, y) -> x + y); } /* @@ -42,7 +42,11 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMap(n -> { + Observable left = n.left == null ? Observable.empty() : iterate(Observable.just(n.left)); + Observable right = n.right == null ? Observable.empty() : iterate(Observable.just(n.right)); + return Observable.merge(left, right, Observable.just(n.value)); + }); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..68229f3 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMap(e -> Observable.just(e).subscribeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..c22df6f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,14 +16,16 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import cn.nextop.rxjava.share.util.Tuples; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.Single; + /** * @author Baoyi Chen */ @@ -35,7 +37,8 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); +// return source.count(); + return source.reduce(0L, (a, b) -> a + 1); } /* @@ -44,7 +47,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(e -> Observable.fromIterable(e)); } /* @@ -53,7 +56,8 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); +// return source.distinct(); + return source.groupBy(e -> e).map(e -> e.getKey()); } /* @@ -62,7 +66,9 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(e -> { + if (conditon.test(e)) return Observable.just(e); else return Observable.empty(); + }); } /* @@ -71,7 +77,8 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); +// return source.elementAt(index); + return source.zipWith(Observable.range(0, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b)).filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); } /* @@ -80,7 +87,8 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); +// return source.repeat(count); + return Observable.range(0, count).concatMap(e -> source); } /* @@ -89,16 +97,27 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); +// return Observable.concat(source); + return Observable.create(emitter -> { concat(source, emitter); }); + } + private void concat(List> source, ObservableEmitter emitter) { + if (source.isEmpty()) { + emitter.onComplete(); + } else { + source.get(0).subscribe(e -> { + emitter.onNext(e); + }, e -> emitter.onError(e), () -> { + concat(source.subList(1, source.size()), emitter); + }); + } } - /* * example: * param: Observable["a"], Observable["b"] * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(e -> e); } /* @@ -107,7 +126,8 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); +// return source.delay(delay, unit); + return source.concatMap(e -> Observable.just(e).delay(delay, unit)); } }