diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..0f13f13 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -16,6 +16,7 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Lists; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; @@ -30,6 +31,13 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + Observable.fromIterable(Lists.of("a", "b", "c")).subscribe(System.out::println); + + return observable.map(s -> { + return new Tuple2(1,s); + }) + .scan((acc, t) -> { + return new Tuple2(acc.getV1()+1, t.getV2()); + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..de6ac92 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,10 +17,13 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; +import java.util.HashMap; import java.util.Map; /** @@ -34,7 +37,9 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(s -> s).map(groups -> { + return groups.count().toObservable().map(c -> Tuples.of(groups.getKey(), c.intValue())); + }).flatMap(e->e); } /* @@ -43,7 +48,10 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return this.wordCount1(words).reduce(new HashMap(), (t1, t2) -> { + t1.put(t2.getV1(), t2.getV2()); + return t1; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..f6766ca 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -19,6 +19,9 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import java.util.ArrayList; +import java.util.List; + /** * @author Baoyi Chen */ @@ -28,7 +31,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((x,y)-> x+y); } /* @@ -42,7 +45,24 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return Observable.create(e -> { + observable.subscribe(node -> { + List arr = new ArrayList(); + this._iterate(node, arr); + for(Integer num : arr) { + e.onNext(num); + } + }); + e.onComplete(); + }); + } + + private void _iterate(Node node, List arr) { + if(node != null) { + _iterate(node.left, arr); + _iterate(node.right, arr); + arr.add(node.value); + } } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..f2e103e 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.map(s -> Observable.just(s).subscribeOn(Schedulers.io())).flatMap(e->e); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..a7d4933 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,10 +16,15 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Lists; import io.reactivex.Maybe; import io.reactivex.Observable; +import io.reactivex.Scheduler; import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; +import java.lang.reflect.Array; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; @@ -35,7 +40,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce((long)0, (acc, s) -> acc+1); } /* @@ -44,7 +49,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.map(l -> Observable.fromIterable(l)).flatMap(s->s); } /* @@ -53,7 +58,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return new Practice2().wordCount1(source).map(t -> t.getV1()); } /* @@ -62,7 +67,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(n -> conditon.test(n)).buffer(2).map(arr -> arr.get(0).getKey() ? arr.get(0) : arr.get(1)).flatMap(s->s); } /* @@ -71,7 +76,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return Maybe.fromFuture(source.skip(index).take(1).toFuture()); } /* @@ -80,16 +85,23 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + Observable head = Observable.empty(); + for(int i=0;i concat(List> source) { - throw new UnsupportedOperationException("implementation"); + List arr = new ArrayList(); + for(Observable o : source) { + o.blockingSubscribe(s -> arr.add(s)); + } + return Observable.fromIterable(arr); } /* @@ -98,7 +110,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(s->s); } /* @@ -107,7 +119,14 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return Observable.create(emiter -> { + source.forEach(s -> { + Observable.just(s).delay(delay, unit).blockingSubscribe(c -> { + emiter.onNext(c); + }); + }); + emiter.onComplete(); + }); } }