diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..1f2c82c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -1,35 +1,36 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice1 { - - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Tuples; +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice1 { + + /* + * 举例如下: + * 参数 Observable["a","b","c"] + * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return Observable.zip(observable, Observable.range(1, Integer.MAX_VALUE), (a, b) -> Tuples.of(b, a)); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..099039d 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -1,49 +1,53 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; -import io.reactivex.Single; - -import java.util.Map; - -/** - * @author Baoyi Chen - */ -public class Practice2 { - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import cn.nextop.rxjava.share.util.Tuples; +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.*; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author Baoyi Chen + */ +public class Practice2 { + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + return words.groupBy(string -> string).flatMap(observable-> observable.count().toObservable().map(count-> Tuples.of(observable.getKey(),count.intValue()))); + } + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Single[Map{a=2, b=1, c=2}] + */ + public Single> wordCount2(Observable words) { + return words.reduce(new HashMap<>(), (map, s) -> { + map.put(s, map.get(s) == null ? 1 : map.get(s) +1); + return map; + }); + } + +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..03f93e0 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -1,60 +1,64 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice3 { - - /* - * 根据iterate的结果求和 - */ - public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * 5 - * / \ - * 6 7 - * / \ \ - * 4 3 nil - * - * return Observable[4, 3, 6, 7, 5] 顺序无关 - */ - public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - - public static class Node { - public Node left; - public Node right; - public int value; - - public Node(Node left, Node right, int value) { - this.left = left; - this.right = right; - this.value = value; - } - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import io.reactivex.*; + +/** + * @author Baoyi Chen + */ +public class Practice3 { + + /* + * 根据iterate的结果求和 + */ + public Maybe sum(Observable observable) { + return iterate(observable).reduce((x, y) -> x + y); + } + + /* + * 举例: + * 5 + * / \ + * 6 7 + * / \ \ + * 4 3 nil + * + * return Observable[4, 3, 6, 7, 5] 顺序无关 + */ + public Observable iterate(Observable observable) { + return observable.flatMap(e -> { + Observable left = e.left == null ? Observable.empty() : iterate(Observable.just(e.left)); + Observable right = e.right == null ? Observable.empty() : iterate(Observable.just(e.right)); + Observable value = Observable.just(e.value); + return Observable.merge(left, right, value); + }); + } + + public static class Node { + public Node left; + public Node right; + public int value; + + public Node(Node left, Node right, int value) { + this.left = left; + this.right = right; + this.value = value; + } + } + +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..20a1f5a 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -1,50 +1,51 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import io.reactivex.Observable; - - -/** - * @author Baoyi Chen - */ -public class Practice4 { - - /* - * 举例: - * 参数observable = Observable["a", "b", "c"] - * 参数observer在消费observable时,每个元素都在独立的线程 - * - * thread 1 --------------- - * |-----------| ["a"] | - * | --------------- - * | - * ------------------------- ---------- |thread 2 --------------- - * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | - * ------------------------- ---------- | --------------- - * | - * |thread 3 --------------- - * |-----------| ["c"] | - * --------------- - * - */ - public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import io.reactivex.*; +import io.reactivex.schedulers.Schedulers; + + +/** + * @author Baoyi Chen + */ +public class Practice4 { + + /* + * 举例: + * 参数observable = Observable["a", "b", "c"] + * 参数observer在消费observable时,每个元素都在独立的线程 + * + * thread 1 --------------- + * |-----------| ["a"] | + * | --------------- + * | + * ------------------------- ---------- |thread 2 --------------- + * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | + * ------------------------- ---------- | --------------- + * | + * |thread 3 --------------- + * |-----------| ["c"] | + * --------------- + * + */ + public Observable runInMultiThread(Observable observable) { + return observable.flatMap(o-> Observable.just(o).subscribeOn(Schedulers.newThread())); + } + +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..d1c076c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -1,113 +1,123 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; - -/** - * @author Baoyi Chen - */ -public class Practice5 { - - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.Tuples; +import io.reactivex.*; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return source.reduce(0l, (a, b) -> a + 1); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.concatMap(e -> Observable.fromIterable(e)); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return source.groupBy(e -> e).map(e -> e.getKey()); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.flatMap(e -> conditon.test(e) ? Observable.just(e) : Observable.empty()); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.zipWith(Observable.range(0, Integer.MAX_VALUE), (a, b) -> Tuples.of(a, b)) + .filter(x -> x.getV2() == index).map(e -> e.getV1()).firstElement(); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(1, count).flatMap(e -> source); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.create(observableEmitter -> doConcat(source, observableEmitter)); + } + + private void doConcat(List> source, ObservableEmitter emitter) { + if (source.isEmpty()) { + emitter.onComplete(); + } else { + source.get(0) + .doOnComplete(() -> doConcat(source.subList(1, source.size()), emitter)) + .subscribe(string -> emitter.onNext(string)); + } + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(e -> e); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.concatMap(e -> Observable.just(e).delay(delay, unit)); + } + +} diff --git a/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java b/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java index 0e92033..c467ad6 100644 --- a/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java +++ b/src/test/java/cn/nextop/rxjava/share/practices/Practice3Test.java @@ -30,4 +30,4 @@ public void test1() { assertTrue(list1.contains(i)); } } -} \ No newline at end of file +}